In [None]:
import math
import random
import numpy as np
import timeit
np.seterr(all = 'ignore')

"""The Neural Network part of the code has been adapted from the one provided in GitHub at the following address 
https://github.com/FlorianMuellerklein/Machine-Learning/blob/master/Old/BackPropagationNN.py"""

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# derivative of sigmoid
def dsigmoid(y):
    return y * (1.0 - y)

# using tanh over logistic sigmoid is recommended   
def tanh(x):
    return math.tanh(x)
    
# derivative for tanh sigmoid
def dtanh(y):
    return 1 - y*y

class MLP_NeuralNetwork(object):
    """
    Basic MultiLayer Perceptron (MLP) network, adapted and from the book 'Programming Collective Intelligence' (http://shop.oreilly.com/product/9780596529321.do)
    Consists of three layers: input, hidden and output. The sizes of input and output must match data
    the size of hidden is user defined when initializing the network.
    """
    def __init__(self, input, hidden, output, iterations, learning_rate, momentum, rate_decay):
        """
        :param input: number of input neurons
        :param hidden: number of hidden neurons
        :param output: number of output neurons
        """
        # initialize parameters
        self.iterations = iterations
        self.learning_rate = learning_rate
        self.momentum = momentum
        self.rate_decay = rate_decay
        
        # initialize arrays
        self.input = input + 1 # add 1 for bias node
        self.hidden = hidden
        self.output = output

        # set up array of 1s for activations
        self.ai = [1.0] * self.input
        self.ah = [1.0] * self.hidden
        self.ao = [1.0] * self.output

        # create randomized weights
        # use scheme from 'efficient backprop to initialize weights
        input_range = 1.0 / self.input ** (1/2)
        output_range = 1.0 / self.hidden ** (1/2)
        self.wi = np.random.normal(loc = 0, scale = input_range, size = (self.input, self.hidden))
        self.wo = np.random.normal(loc = 0, scale = output_range, size = (self.hidden, self.output))
        
        # create arrays of 0 for changes
        # this is essentially an array of temporary values that gets updated at each iteration
        # based on how much the weights need to change in the following iteration
        self.ci = np.zeros((self.input, self.hidden))
        self.co = np.zeros((self.hidden, self.output))

    def feedForward(self, inputs):
        """
        The feedforward algorithm loops over all the nodes in the hidden layer and
        adds together all the outputs from the input layer * their weights
        the output of each node is the sigmoid function of the sum of all inputs
        which is then passed on to the next layer.
        """
        if len(inputs) != self.input-1:
            raise ValueError('Wrong number of inputs')

        # input activations
        for i in range(self.input -1): # -1 is to avoid the bias
            self.ai[i] = inputs[i]

        # hidden activations
        for j in range(self.hidden):
            sum = 0.0
            for i in range(self.input):
                sum += self.ai[i] * self.wi[i][j]
            self.ah[j] = tanh(sum)

        # output activations
        for k in range(self.output):
            sum = 0.0
            for j in range(self.hidden):
                sum += self.ah[j] * self.wo[j][k]
            self.ao[k] = sigmoid(sum)

        return self.ao[:]

    def backPropagate(self, targets):
        """
        For the output layer
        1. Calculates the difference between output value and target value
        2. Get the derivative (slope) of the sigmoid function in order to determine how much the weights need to change
        3. update the weights for every node based on the learning rate and sig derivative
        For the hidden layer
        1. calculate the sum of the strength of each output link multiplied by how much the target node has to change
        2. get derivative to determine how much weights need to change
        3. change the weights based on learning rate and derivative
        :param targets: y values
        :param N: learning rate
        :return: updated weights
        """
        if len(targets) != self.output:
            raise ValueError('Wrong number of targets')

        # calculate error terms for output
        # the delta tell you which direction to change the weights
        output_deltas = [0.0] * self.output
        for k in range(self.output):
            error = -(targets[k] - self.ao[k])
            output_deltas[k] = dsigmoid(self.ao[k]) * error

        # calculate error terms for hidden
        # delta tells you which direction to change the weights
        hidden_deltas = [0.0] * self.hidden
        for j in range(self.hidden):
            error = 0.0
            for k in range(self.output):
                error += output_deltas[k] * self.wo[j][k]
            hidden_deltas[j] = dtanh(self.ah[j]) * error

        # update the weights connecting hidden to output
        for j in range(self.hidden):
            for k in range(self.output):
                change = output_deltas[k] * self.ah[j]
                self.wo[j][k] -= self.learning_rate * change + self.co[j][k] * self.momentum
                self.co[j][k] = change

        # update the weights connecting input to hidden
        for i in range(self.input):
            for j in range(self.hidden):
                change = hidden_deltas[j] * self.ai[i]
                self.wi[i][j] -= self.learning_rate * change + self.ci[i][j] * self.momentum
                self.ci[i][j] = change

        # calculate error
        error = 0.0
        for k in range(len(targets)):
            error += 0.5 * (targets[k] - self.ao[k]) ** 2
        return error

    def test(self, patterns):
        """
        Currently this will print out the targets next to the predictions.
        Not useful for actual ML, just for visual inspection.
        """
        for p in patterns:
            #print(p[1], '->', self.feedForward(p[0]))
            with open('forecast.csv', 'a') as forecast:
                forecast.write(str(self.feedForward(p[0])) + '\n')
                forecast.close()
                #print(self.feedForward(p[0]))
                
    def train(self, patterns):
        # N: learning rate
        for i in range(self.iterations):
            error = 0.0
            for p in patterns:
                inputs = p[0]
                targets = p[1]
                self.feedForward(inputs)
                error += self.backPropagate(targets)
                with open('error.csv', 'a') as errorfile:
                    errorfile.write(str(error) + '\n')
                    errorfile.close()
            if i % 10 == 0:
                print('error %-.5f' % error)
            # learning rate decay
            self.learning_rate = self.learning_rate * (self.learning_rate / (self.learning_rate + (self.learning_rate * self.rate_decay)))
                
    def predict(self, X):
        """
        return list of predictions after training algorithm
        """
        predictions = []
        for p in X:
            predictions.append(self.feedForward(p))
        return predictions

In [None]:
def run():
    
    import timeit
    import csv
    import pandas as pd
    
    def load_data():

    #load the data using pandas, remove the possible duplicates (checking the 'Date' column), convert the data in the 'Date' column as datetime format "YYYY-MM-DD hh:mm:ss"
        raw_data = pd.read_csv('./Data.csv') #dataset location
        clear = raw_data.drop_duplicates(['Date'])
        clear.to_csv('./Data_clean.csv', index = False) #indicate clean dataset new location
        data = pd.read_csv('./Data_clean.csv')
        data['Date'] = pd.to_datetime(data['Date'])

    #define the period of interest for the analysis.        
        period = ('2016-04-01 00:00:00','2016-06-30 23:50:00')
        mask = (data['Date'] >= period[0]) & (data['Date'] <= period[1])
        data = data.loc[mask]
        data = data['Last Price']
        
    #reshape the dataset in order to contain n column        
        n = 18 #to choose
        
        new_data=[]
        for x in range(0, len(data) - n):
            new_data.append(data[x:x+n])          
        
    # scale the data so values are between 0 and 1
        data = np.array(new_data)
        a = data.min()
        #print(a)
        b = data.max()
        #print(b)
        data = data - a 
        data = data / b

    #define the test set "y" and the training set "data"        
        y = data[:,n-1:n]
        #print(y)
        print('Y - shape: ' +str(y.shape))
    
        data = data[:,:17]
        #print(data)
        print('Data - shape: ' +str(data.shape))
        
    #create a tuple containing the training and test set in the following format [[[x1, x2, x3, ..., xn], [y1, y2, ..., yn]],...]     
        out = []
        for i in range(data.shape[0]):
            Z = list((data[i,:].tolist(), y[i].tolist()))
            out.append(Z)
        #print(out)
        return out
        
    X = load_data()
    start = timeit.default_timer()
    
    #run the NN with the given paramethers
    NN = MLP_NeuralNetwork(17, 10, 1, iterations = 50, learning_rate = 0.5, momentum = 0.5, rate_decay = 0.01)    

    NN.train(X)
    NN.test(X)
    stop = timeit.default_timer()
    print('Run time: ' + str(stop-start))
    
run()

In [None]:
import random
import numpy as np
import timeit
import pandas as pd
import matplotlib.pyplot as plt
import csv

"""
As before, reimports the dataset, clears the duplicates and convert the 'Date' 
column to the format 'YYYY-MM-DD hh:mm:ss'
"""

period = ('2016-05-20 00:00:00','2016-06-30 23:50:00')
print('Period Considered: ' + period[0] +' to '+ period[1])
data = pd.read_csv('./Data_clean.csv')
data['Date'] = pd.to_datetime(data['Date'])
time = data['Date']
mask = (time >= period[0]) & (time <= period[1])
data = data.loc[mask]

time_used = data['Date']
data = data['Last Price']

"""all the following passages are exactly as before: I need them for the analysis"""

n = 18 #to choose (must be same as before)
new_data=[]
for x in range(0, len(data) - n):
    new_data.append(data[x:x+n])

data = np.array(new_data)

a = data.min()
b = data.max()
data = data - a # scale the data so values are between 0 and 1
data = data / b
y = data[:,n-1:n]
#print(y.shape)
#print(y)
data = data[:,:17]
#print(data.shape)
#print(data)

"""ff is the array containing the forecasted data, y is the one containing the real data"""

ff = pd.read_csv('forecast.csv', header = None) #location of the "forecast.csv" file created when running the NN
#print(ff)
ff[0] = ff[0].astype(str).str[1:-1]
ff[0] = ff[0].astype('float64', raise_on_error = False)
ff = np.array(ff)
k = len(data)
ff = ff[len(ff)-k:len(ff)]

"""I use a and b (the min and max value found before) to unscale the data in order to better visualise them"""

ff = ff*b
ff = ff+a
ffcsv = pd.DataFrame(ff)
ffcsv.to_csv('forecasted'+str(period[0])+'-'+str(period[1])+'-n='+str(n)+'.csv')
y = y*b
y = y+a
ycsv = pd.DataFrame(y)
ycsv.to_csv('real'+str(period[0])+'-'+str(period[1])+'-n='+str(n)+'.csv')
#print(len(ff))
#print(len(y))

"""plot for the comparison of the two time-series"""

plt.plot(ff, 'g', label='Forecasted')
plt.plot(y,'r', label='Real')
plt.title('Period Considered: ' + period[0] +' to '+ period[1]+' - (n = '+str(n)+')')
plt.legend()
plt.savefig(str(period[0])+'-'+str(period[1])+'-n='+str(n)+'.png')
plt.show()

In [None]:
"""I first decide the data in which to run the investment simulation"""

period_sim = ('2016-06-01 00:00:00','2016-06-30 23:50:00')
print('Period Considered: ' + period_sim[0] +' to '+ period_sim[1])

mask = (time >= period_sim[0]) & (time <= period_sim[1])
time = time.loc[mask]

"""I use the lenght of this array to reshape the y and ff and consider only the data in which I'm interested in"""

l = len(time)
print(l)
#print(len(y))
y_test = y[len(y)-l:len(y)]
ff_test = ff[len(ff)-l:len(ff)]
#print(y_test)
print(len(y_test))
print(len(ff_test))

"""I create a list of 1s and -1s according to the price movement of the forecasted data: 
if at t+1 the value is > than the one at t, I write 1, and vice versa
"""
indicator = []

for j in range(0,l-1):
    if ff_test[j+1]-ff_test[j]>0:
        indicator.append(1)
    elif ff_test[j+1]-ff_test[j]<0: 
        indicator.append(-1)
        
indicator.insert(0,1)
#print(len(indicator))

"""I do the same on the real data set, in order to check the performance 
of the algorithm (% of times it got the right movement)"""

indicator_real = []

for j in range(0,l-1):
    if y_test[j+1]-y_test[j]>0:
        indicator_real.append(1)
    elif y_test[j+1]-y_test[j]<=0: 
        indicator_real.append(-1)
        
indicator_real.insert(0,1)
#print(len(indicator_real))

"""I create a list containing the % difference between two consecutive values in the real data"""

returns = []

for j in range(0,l-1):
        returns.append(((y_test[j+1])/y_test[j])-1)
        
returns.insert(0,1)
print(len(returns))
#print(returns)

"""The newindicator is an adjustment to the indicator list created before, 
in order to insert a 1 everytime I find a 0 in the returns list, so that I can ignore it"""

newindicator = []
for i in range(0,len(indicator)-1):
    #for j in range(0,len(returns)-1):
    if returns[i] == 0:
        newindicator.append(1)
    elif returns[i] != 0:
        newindicator.append(indicator[i])
#print(len(newindicator))    
#print(newindicator[1])


x = 1000 #initial investment

"""I calculate the profit and therefore the total profit"""

z = next(i for i, v in enumerate(returns[1:len(returns)]) if v != 0)
#print(z)
#print(newindicator[z+1])
#print(returns[z+1])
a = [i for i in range(0,len(newindicator)-1)]

a[z] = x*newindicator[z+1]*(returns[z+1])
#print(a[z])

for j in range(z+1,len(newindicator)-1):
    a[j]=(((a[j-1])*newindicator[j+1]*(1+returns[j+1])))
    
#print(a)
profit=np.sum(a)
#print(z)
#print(profit)
for i in range(0,z+1):
    a.insert(i,x)
    #print(a[0])
#a[z+1] = a[z+1] + a[z]
for i in range(z+1,len(a)):   
    a[i] = a[i] + a[i-1]

#print(a)
totalprofit = ((x+profit)/x)-1
tot_profit = 'Total profit: {percent:.4%}'.format(percent=totalprofit)
print(tot_profit)

"""I calcluate the number of times the price movement prediction was right"""

up_down = []
for i in range(0, l-1):
    if indicator[i] == indicator_real[i]:
        up_down.append(1)
    elif indicator[i] != indicator_real[i]:
        up_down.append(0)

k = up_down.count(1)
#print(up_down)
precision = k/len(indicator_real)
price_movement = 'Price movement prediction accuracy: {percent:.2%}'.format(percent=precision)
print(price_movement)

#Plot the profit graph

plt.plot(a, 'g')
plt.title('Period Considered: ' + period_sim[0] +' to '+ period_sim[1]+' (n = '+str(n)+')')
plt.annotate(tot_profit+' - '+price_movement, (0,0), (0, -25), xycoords='axes fraction', textcoords='offset points', va='top')
#plt.annotate(price_movement, (0,0), (0, -40), xycoords='axes fraction', textcoords='offset points', va='top')
plt.savefig('Profit'+' '+str(period_sim[0])+'-'+str(period_sim[1])+'-n='+str(n)+'.png')
plt.show()

In [None]:
"""Create a list of 1s and -1s according to the price movement of the forecasted data: 
if at t+n the value is > than the one at t by a certain amount p, I write 1, and vice versa
"""
p = 0.003
indicator1 = []
beg = ff_test[0]

for i in range(1,l-2):
    kk = (ff_test[i]/beg)-1
    #print(kk)
    if kk >= p or kk<= -p:
        beg = ff_test[i]
        if kk >= p:
            indicator1.append(1)
        elif kk <= -p:
            indicator1.append(-1)
    else:
        indicator1.append(0)
        
indicator1.insert(0,1)
#print((indicator1))

"""It do the same on the real data set, in order to check the performance 
of the algorithm (% of times it got the right movement)"""

indicator1_real = []

for j in range(1,l-2):
    if y_test[j+1]-y_test[j]>0:
        indicator1_real.append(1)
    elif y_test[j+1]-y_test[j]<=0: 
        indicator1_real.append(-1)
        
indicator1_real.insert(0,1)
#print(len(indicator1_real))

"""Creates a list containing the % difference between two consecutive values in the real data"""

returns1 = []

for j in range(0,l-1):
        returns1.append(((y_test[j+1])/y_test[j])-1)
        
returns1.insert(0,1)
print(len(returns1))
#print(returns1)

"""The newindicator is an adjustment to the indicator list created before, 
in order to insert a 1 everytime I find a 0 in the returns list, so that I can ignore it"""

returns2 = []
for i in range(0,len(indicator1)-1):
    #for j in range(0,len(returns1)-1):
    if indicator1[i] == 0:
        returns2.append(0)
    elif indicator1[i] != 0:
        returns2.append(returns1[i])
#print(len(newindicator1))    
#print(newindicator1[1])

newindicator1 = []
for i in range(0,len(indicator1)-1):
    #for j in range(0,len(returns)-1):
    if returns2[i] == 0:
        newindicator1.append(1)
    elif returns2[i] != 0:
        newindicator1.append(indicator1[i])
#print(len(newindicator1))    
#print(newindicator1[1])


x = 1000 #initial investment

"""Calculate the profit and therefore the total profit"""

z = next(i for i, v in enumerate(returns1[1:len(returns1)]) if v != 0)
#print(z)
#print(newindicator1[z+1])
#print(returns1[z+1])
a = [i for i in range(0,len(newindicator1)-1)]

a[z] = x*newindicator1[z+1]*(returns1[z+1])
#print(a[z])

for j in range(z+1,len(newindicator1)-1):
    a[j]=(((a[j-1])*newindicator1[j+1]*(1+returns1[j+1])))
    
#print(a)
profit=np.sum(a)
#print(z)
#print(profit)
for i in range(0,z+1):
    a.insert(i,x)
    #print(a[0])
#a[z+1] = a[z+1] + a[z]
for i in range(z+1,len(a)):   
    a[i] = a[i] + a[i-1]

#print(a)
totalprofit = ((x+profit)/x)-1
tot_profit = 'Total profit: {percent:.4%}'.format(percent=totalprofit)
print(tot_profit)

"""Calcluate the number of orders placed"""

k = 'Total Orders '+str(indicator1.count(1))
print(k)
limit = 'Investment Limit: {percent:.2%}'.format(percent=p)
print(limit)


plt.plot(a, 'g')
plt.title('Profit March with Limit - ' +'(n = '+str(n)+')')
plt.annotate(tot_profit+' - '+k+' - '+limit, (0,0), (0, -25), xycoords='axes fraction', textcoords='offset points', va='top')
plt.savefig('./Report/Profit_Limit_March_'+'(n='+str(n)+').png')
plt.show()